package com.flow.framework.mq.producer;

import com.flow.framework.common.constant.FrameworkCommonConstant;
import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.json.JsonObject;
import com.flow.framework.common.util.date.DateUtil;
import com.flow.framework.common.util.verify.VerifyUtil;
import com.flow.framework.core.constant.FrameworkCoreConstant;
import com.flow.framework.core.holder.SecurityContextHolder;
import com.flow.framework.core.holder.SystemVersionContextHolder;
import com.flow.framework.core.pojo.dto.base.notify.BaseNotifyDto;
import com.flow.framework.core.system.helper.AsyncHelper;
import com.flow.framework.core.toolkit.IdentifierGeneratorSequence;
import com.flow.framework.mq.constant.FrameworkMqConstant;
import com.flow.framework.mq.pojo.bo.QueueBo;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicReference;

/**
 * MQ消息发送客户端
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/3/26
 */
@Slf4j
@RequiredArgsConstructor
public class MessageQueueClient {

    private final RabbitTemplate rabbitTemplate;

    private final IdentifierGeneratorSequence identifierGeneratorSequence;

    /**
     * 发送消息并确认消息是否发送成功
     *
     * @param queueBo 消息发送对象
     * @param dto     通知消息
     */
    public <T extends BaseNotifyDto> void sendAndConfirm(QueueBo queueBo, T dto) {
        if (null == queueBo) {
            log.error("queue biz object is null");
            throw new CheckedException(SystemErrorCode.MQ_ERROR);
        }
        String bizCode = queueBo.getBizCode();
        String handleMsgServiceCode = queueBo.getHandleMsgServiceCode();
        if (VerifyUtil.hasEmpty(bizCode, handleMsgServiceCode, dto)) {
            log.error("biz code or service code or message id or dto is empty. biz code: {}, service code: {}",
                    bizCode, handleMsgServiceCode);
            throw new CheckedException(SystemErrorCode.MQ_ERROR);
        }
        String traceId = MDC.get(FrameworkCommonConstant.GLOBAL_LOG_TRACE_KEY);
        if (VerifyUtil.isEmpty(traceId)) {
            traceId = AsyncHelper.randomAsyncTraceId();
        }
        String message = JsonObject.toString(dto);
        String messageId = String.valueOf(identifierGeneratorSequence.nextId());
        MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
                .setHeader(FrameworkCommonConstant.GLOBAL_LOG_TRACE_KEY, traceId)
                .setHeader(FrameworkCoreConstant.BIZ_TENANT_ID_HEADER_KEY, SecurityContextHolder.getTenantIdQuietly())
                .setHeader(FrameworkCommonConstant.SYSTEM_VERSION_KEY, SystemVersionContextHolder.getCurrentSystemVersion())
                .setHeader(FrameworkMqConstant.MQ_SEND_DATE_TIME_KEY, DateUtil.formatLocalDateTime(LocalDateTime.now()))
                .setMessageId(messageId)
                .build();

        AtomicReference<SendMsgResult> sendMsgResultRef = new AtomicReference<>();
        CorrelationData correlationData = new CorrelationData(messageId);
        correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                sendMsgResultRef.set(new SendMsgResult(ex, null));
                log.error("send message queue error. message :{}", message, ex);
                throw new CheckedException(SystemErrorCode.SYSTEM_NOT_SUPPORT_ERROR, "send message queue error.", ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                log.error("send message queue error. message :{}", message);
                sendMsgResultRef.set(new SendMsgResult(null, result));
            }
        });

        rabbitTemplate.sendAndReceive(
                bizCode + FrameworkMqConstant.MQ_COMBINATION_SPLIT_LINE + handleMsgServiceCode,
                bizCode + FrameworkMqConstant.MQ_COMBINATION_SPLIT_LINE + handleMsgServiceCode,
                new Message(message.getBytes(StandardCharsets.UTF_8), messageProperties), correlationData);
        SendMsgResult sendMsgResult = sendMsgResultRef.get();
        Throwable error = sendMsgResult.getError();
        if (null != error) {
            throw new CheckedException(SystemErrorCode.SYSTEM_NOT_SUPPORT_ERROR, "send message queue error.", error);
        }

        CorrelationData.Confirm confirm = sendMsgResult.getConfirm();
        if (null != confirm && !confirm.isAck()) {
            String reason = confirm.getReason();
            log.error("send message queue error. reason : {}, message :{}", reason, message);
            throw new CheckedException(SystemErrorCode.SYSTEM_NOT_SUPPORT_ERROR, reason);
        }
    }

    @Getter
    @AllArgsConstructor
    private static class SendMsgResult {

        private Throwable error;

        private CorrelationData.Confirm confirm;
    }
}
